feat(decisioning): PostgresTaskRegistry — durable HITL task state (v6.1)#361
Merged
feat(decisioning): PostgresTaskRegistry — durable HITL task state (v6.1)#361
Conversation
Closes #353 Implements the durable `TaskRegistry` promised in the v6.1 plan. Production adopters running `ADCP_ENV=production` were blocked from HITL flows because `InMemoryTaskRegistry.is_durable=False` trips the production-mode gate in `create_adcp_server_from_platform`. This PR delivers the Postgres-backed alternative. **Implementation note:** Uses `psycopg_pool.AsyncConnectionPool` (the existing `[pg]` extra) rather than SQLAlchemy ORM. The `TaskRegistry` Protocol is fully async; the async psycopg pool is a drop-in with no new dependency. See PR body for rationale. https://claude.ai/code/session_01L8ig1NtL6WZQcYge3RKrb3
- Add internal _table parameter + pre-formatted SQL (enables test isolation, mirrors PgReplayStore.table_name pattern without public API) - Add decisioning/pg/*.sql to pyproject.toml package-data so DDL ships in wheel - Fix conformance test fixture to use _table for per-test table isolation - Add complete/fail unknown-task raise tests to conformance suite - Fix stub ClassVar annotation in decisioning __init__.py - Remove unused logging import from task_registry.py - Update pg extra comment in pyproject.toml to reference PostgresTaskRegistry https://claude.ai/code/session_01L8ig1NtL6WZQcYge3RKrb3
8d9d05c to
ec4a1b1
Compare
5 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #353
Summary
Production sellers running
ADCP_ENV=productionwere blocked from HITL flows becauseInMemoryTaskRegistry.is_durable=Falsetrips the production-mode gate increate_adcp_server_from_platform. The gate error message literally says "v6.1 ships PostgresTaskRegistry" — this PR delivers it.Adds
PostgresTaskRegistry: a durableTaskRegistryimplementation backed by adecisioning_tasksPostgres table. Wiring is a one-line change for adopters:Implementation note — psycopg over SQLAlchemy: The issue referenced "SQLAlchemy ORM model." The
TaskRegistryProtocol is fully async;psycopg_pool.AsyncConnectionPool(already in the[pg]extra) is a natural drop-in with no new dependency, while SQLAlchemy async ORM would add a heavyweight dep and anAsyncSessionlifecycle that doesn't compose cleanly with the Protocol's one-connection-per-call shape. Both expert reviewers converged on psycopg.Files changed
src/adcp/decisioning/pg/__init__.pysrc/adcp/decisioning/pg/task_registry.pyPostgresTaskRegistryimplementationsrc/adcp/decisioning/pg/decisioning_tasks.sqlsrc/adcp/decisioning/__init__.py[pg]not installed)pyproject.tomldecisioning/pg/*.sqlto package-data; update[pg]extra commenttests/test_decisioning_pg_task_registry.pyis_durable, stub import,account_idvalidation)tests/conformance/decisioning/test_pg_task_registry.pyADCP_PG_TEST_URLset)Key correctness properties
get()usesWHERE task_id = %s AND (%s IS NULL OR account_id = %s)— account scoping is in theWHEREpredicate, not Python-level filter after fetch.complete()andfail()useUPDATE ... WHERE state NOT IN ('completed', 'failed') RETURNING task_id. If zero rows return, a follow-upSELECTdetermines unknown vs. idempotent vs. conflict — no TOCTOU across workers.update_progress()usesWHERE state NOT IN ('completed', 'failed')so straggler writes can't resurrect completed tasks.is_durable: ClassVar[bool] = Trueset at class level (not instance), matching whatserve.py:194checks.account_idvalidation inissue()copied verbatim fromInMemoryTaskRegistry— rejects empty, whitespace, and"<unset>"values.What's tested
pytest tests/test_decisioning_pg_task_registry.py— runs without real DB; covers stub import,is_durable,account_idvalidation, Protocol structural matchingtests/conformance/decisioning/test_pg_task_registry.py— full lifecycle, cross-tenant security, idempotency, concurrent issue — requiresADCP_PG_TEST_URLNote: The
pg-replay-storeCI job should be extended to also runtests/conformance/decisioning/test_pg_task_registry.py(cannot edit.github/workflowsfrom triage PRs — flagged as a nit for human reviewer).Pre-PR review
_tableinternal param added for test isolation, pre-formatted SQL, unusedloggingremoved, stubClassVarannotation corrected, missingcomplete/fail(unknown)conformance tests added.create_schema()+ raw DDL file covers both boot-and-go and migration-tool adopters,from adcp.decisioning import PostgresTaskRegistryis discoverable. SQL file now in wheel via pyproject.toml fix.Nits surfaced (not fixed in this PR)
pg-replay-storejob doesn't run decisioning conformance tests (.github/workflowsedit needed — out of triage PR scope)complete()afterfail()) undocumented in Protocol — follow-up to alignInMemoryTaskRegistryand document in Protocol docstringSession: https://claude.ai/code/session_01L8ig1NtL6WZQcYge3RKrb3
Generated by Claude Code